package cn.xyt.server;

import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


import cn.xyt.dto.Message;
import cn.xyt.util.DeviceCache;
import cn.xyt.util.MessageThreadPool;
import cn.xyt.util.NettyContextUtil;

@Service
@Sharable
public class MessageInDecoder extends SimpleChannelInboundHandler<Message> {
	
	@Autowired
	private MessageController messageController;
	
	@Override
	protected void channelRead0(final ChannelHandlerContext ctx, final Message message) throws Exception {
		String deviceid = ""+message.getHeader().getDeviceid();
		Channel channel = DeviceCache.get(deviceid);
		if ((channel == null) || (!channel.equals(ctx.channel()))) { //保存 通道信息
			DeviceCache.set(deviceid, ctx.channel());
			NettyContextUtil.setAttr(ctx, "id", deviceid);
		}
		
		/** 执行业务处理 */
		MessageThreadPool.executeMessageTask(new Runnable() {
			@Override
			public void run() {
				try { 
					boolean needResp =  messageController.exec(message);
					if(needResp){
						/* ctx.writeAndFlush(message); 此方法 执行outencode 顺序, 和以下方法不一致, 勿用*/
						/* ctx.channel().iswriteable() 判断可写 再写, 无限制写可能出现outofmemory  */
						ctx.channel().writeAndFlush(message);
					}
				} catch (Exception e) {
					e.printStackTrace();
					throw new RuntimeException(e);
				}
			}
		});
		
	}
	
	
	/**
	 * 此方法 chanel.iswriteable ==true 用于触发回调, 然后开始将缓存的 都写完
	 */
	@Override
	public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
		writeAndFlushNextMessageIfPossible(ctx.channel());
	}
	
	// 读取队列的 buf 进行写, 知道不能写, 让等待 channelWritabilityChanged 重新触发
	private void writeAndFlushNextMessageIfPossible(final Channel channel)  {/*
		  if (fatalError) {
		    return;
		  }
		  Buffer buffer = null;
		  try {
		    // channel.isWritable() 配合 WRITE_BUFFER_LOW_WATER_MARK 
		    // 和 WRITE_BUFFER_HIGH_WATER_MARK 实现发送端的流量控制
		    if (channel.isWritable()) {
		      // 注意: 一个while循环也就最多只发送一个BufferResponse, 连续发送BufferResponse是通过writeListener回调实现的
		      while (true) {
		        if (currentPartitionQueue == null && (currentPartitionQueue = queue.poll()) == null) {
		          return;
		        }
		        buffer = currentPartitionQueue.getNextBuffer();
		        if (buffer == null) {
		          // 跳过这部分代码
		          ...
		        }
		        else {
		          // 构造一个response返回给客户端
		          BufferResponse resp = new BufferResponse(buffer, currentPartitionQueue.getSequenceNumber(), currentPartitionQueue.getReceiverId());
		          if (!buffer.isBuffer() &&
		              EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class) {
		            // 跳过这部分代码。batch 模式中 subpartition 的数据准备就绪，通知下游消费者。
		            ...
		          }
		          // 将该response发到netty channel, 当写成功后, 
		          // 通过注册的writeListener又会回调进来, 从而不断地消费 queue 中的请求
		          channel.writeAndFlush(resp).addListener(writeListener);
		          return;
		        }
		      }
		    }
		  }
		  catch (Throwable t) {
		    if (buffer != null) {
		      buffer.recycle();
		    }
		    throw new IOException(t.getMessage(), t);
		  }
		*/}
	
}
